package com.lyon.dmeo.storage.client.raft.service;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.lyon.demo.storage.client.api.core.core.Entry;
import com.lyon.demo.storage.client.api.core.core.AbstractStoreService;
import com.lyon.demo.storage.client.api.core.protocol.storage.Strategies;
import com.lyon.demo.storage.common.spi.annotation.SpiActivate;
import com.lyon.dmeo.storage.client.raft.core.exception.CommonException;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author LeeYan9
 * @since 2022-07-13
 */
@SuppressWarnings({"unused", "SpellCheckingInspection"})
@Slf4j
@SpiActivate(Strategies.MEMORY)
public class MemoryStoreService extends AbstractStoreService {


    /**
     * 先使用内存模型，后面再规划 Mmap
     */

    /**
     * <索引></索引>
     */
    private final ConcurrentLinkedQueue<Long> indexes = new ConcurrentLinkedQueue<>();

    /**
     * <索引,数据></>
     */
    private final ConcurrentHashMap<Long, Entry> entryMap = new ConcurrentHashMap<>();

    /**
     * <索引,数据></>
     */
    private final ConcurrentHashMap<String, ConcurrentHashMap<Object, Object>> pairMapByTag = new ConcurrentHashMap<>();

    /**
     * <索引,数据></>
     */
    private final ConcurrentHashMap<String, ConcurrentHashMap<Object, Long>> pairIndexMapByTag = new ConcurrentHashMap<>();

    /**
     * <索引></索引>
     */
    private final AtomicLong commitedIndex = new AtomicLong(-1);

    private final AtomicLong incrementIndex = new AtomicLong(-1);
    private final AtomicInteger incrementPos = new AtomicInteger(-1);


    @Override
    public void updateCommittedIndex(long currTerm, Long index) {
        Assert.isTrue(getMemberState().getCurrTerm() == currTerm);
        long oldCommitedIndex = commitedIndex.get();
        if (index > oldCommitedIndex) {
            boolean compareAndSet = commitedIndex.compareAndSet(oldCommitedIndex, index);
            if (!compareAndSet) {
                log.error("更新提交索引错误 form {} to {}", oldCommitedIndex, index);
            } else {
                memberState.updateCommittedIndex(commitedIndex.get());
            }
        }
    }

    @Override
    public Entry get(long index) {
        return entryMap.get(index);
    }

    @Override
    public long truncate(Entry entry, long term, String leaderId) {
        Assert.notNull(entry, CommonException.UNKNOWN);
        Assert.notNull(memberState.isFollower(), CommonException.UNKNOWN);
        Assert.isTrue(leaderId.equals(memberState.getLeaderId()), CommonException.UNKNOWN);
        Assert.isTrue(term == memberState.getCurrTerm(), CommonException.UNKNOWN);

        long truncateIndex = entry.getIndex();
        synchronized (entryMap) {
            Assert.isTrue(commitedIndex.get() <= truncateIndex);
            commitedIndex.set(truncateIndex);

            for (Long index : indexes) {
                if (index < truncateIndex) {
                    indexes.remove(index);
                    entryMap.remove(index);
                }
            }
            appendAsFollower(entry);
            return truncateIndex;
        }
    }

    @Override
    public long appendAsFollower(Entry entry) {
        long index = entry.getIndex();
        if (indexes.contains(index)) {
            return index;
        }
        synchronized (entryMap) {
            if (indexes.contains(index)) {
                return index;
            }
            ifProssibleAddPair(entry);
            indexes.add(index);
            entryMap.put(index, entry);
            changeLedgerIndex(index);
            log.info("[{}] Append as Follower {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
            // todo
        }
        return index;
    }

    private void ifProssibleAddPair(Entry entry) {
        Object data = entry.getData();
        if (data instanceof Pair) {
            Pair<?, ?> pair = (Pair<?, ?>) data;
            checkPairMapByTag(entry.getTag());
            checkPairIndexMapByTag(entry.getTag());
            checkPair(pair);
            pairMapByTag.get(getCanonicalName(entry.getTag())).put(pair.getKey(), pair.getValue());
            pairIndexMapByTag.get(getCanonicalName(entry.getTag())).put(pair.getKey(), entry.getIndex());
        }
    }


    @Override
    public void appendAsLeader(Entry entry) {
        Assert.notNull(entry,CommonException.DATA_NOT_NULL);
        long index = incrementIndex.incrementAndGet();
        int pos = incrementPos.incrementAndGet();
        synchronized (entryMap) {
            entry.setIndex(index);
            entry.setPos(pos);

            entryMap.put(index, entry);
            indexes.add(index);

            ifProssibleAddPair(entry);
            changeLedgerIndex(index);
            log.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
        }
    }

    private void checkPair(Pair<?, ?> pair) {
        Assert.notNull(pair, CommonException.DATA_NOT_NULL);
        Assert.notNull(pair.getKey(), CommonException.DATA_NOT_NULL);
    }


    private void checkPairIndexMapByTag(String tag) {
        synchronized (pairIndexMapByTag) {
            ConcurrentHashMap<Object, Long> indexMap = pairIndexMapByTag.get(tag);
            if (Objects.isNull(indexMap)) {
                pairIndexMapByTag.put(getCanonicalName(tag), new ConcurrentHashMap<>(16));
            }
        }
    }

    private void checkPairMapByTag(String tag) {
        synchronized (pairMapByTag) {
            ConcurrentHashMap<Object, Object> map = pairMapByTag.get(tag);
            if (Objects.isNull(map)) {
                pairMapByTag.put(getCanonicalName(tag), new ConcurrentHashMap<>(16));
            }
        }
    }

    private void changeLedgerIndex(long index) {
        this.endIndex = index;
        if (this.beginIndex == -1) {
            this.beginIndex = index;
        }
        memberState.updateLedgerIndex(beginIndex, endIndex);
    }

    @Override
    public Entry hitEntry(long index) {
        if (index < commitedIndex.get()) {
            return null;
        }
        return get(index);
    }

    @Override
    public <K> Entry hitEntry(String tag, K key) {
        checkPairMapByTag(tag);
        checkPairIndexMapByTag(tag);
        Long index = pairIndexMapByTag.get(getCanonicalName(tag)).get(key);
        if (index != null && index >= committedIndex) {
            return entryMap.get(index);
        }
        return null;
    }

    private String getCanonicalName(String tag) {
        return tag.toUpperCase();
    }

}
